#include "config.h"

#include <sys/types.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <errno.h>
#include <pthread.h>
#include <getopt.h>

#define DBG_SUBSYS S_LIBSTORAGE

#include "limits.h"
#include "adt.h"
#include "ynet_rpc.h"
#include "sysy_lib.h"
#include "cluster.h"
#include "chunk.h"
#include "bmap.h"
#include "metadata.h"
#include "lichbd.h"
#include "lichbd_rpc.h"
#include "../../ynet/sock/sock_tcp.h"
#include "rpc_table.h"
#include "rpc_proto.h"
#include "net_table.h"
#include "configure.h"
#include "net_global.h"
#include "lich_md.h"
#include "lichstor.h"
#include "sock_unix.h"
#include "squeue.h"
#include "cache.h"
#include "prof.h"
#include "ylog.h"
#include "vm.h"
#include "dbg.h"

#define MSG_SIZE 64

typedef struct {
        sem_t sem;
        uint64_t io;
        int thread;
        int size;
        int session;
        char mode;
        char *buf;
        lichbd_ioctx_t ioctx;

        fileid_t fileid;
        uint64_t st_size;
        uint64_t offset;
        int flag;
} prof_net_arg_t;

static void __prof_dio_cli_print(prof_net_arg_t *args, int threads, struct timeval *begin)
{
        int i;
        uint64_t io, used;
        struct timeval now;

        io = 0;
        for (i = 0; i < threads; i++) {
                io += args[i].io;
                DBUG("thead [%u, %u], io %llu\n", args[i].session, args[i].thread, (LLU)args[i].io);
        }

        _gettimeofday(&now, NULL);

        used = _time_used(begin, &now);

        DBUG("io %u, used %u\n", (int)io, (int)used);
        printf("iops : %llu\n", (long long)io * 1000 * 1000 /  used);
}

#if 0
static void __prof_lichbd(void *_arg, void *retval)
{
        int ret;
        prof_net_arg_t *arg = _arg;

        ret = *(int *)retval;
        YASSERT(ret > 0);

        arg->io ++;

        if (arg->mode == 'r') {
                lichbd_read(&arg->ioctx, arg->buf, arg->size, 0, __prof_lichbd, arg);
        } else if (arg->mode == 'w') {
                lichbd_write(&arg->ioctx, arg->buf, arg->size, 0, __prof_lichbd, arg);
        } else
                UNIMPLEMENTED(__DUMP__);
}
#endif

static void __prof_write__(void *arg)
{
        int ret;
        prof_net_arg_t *ctx;
        buffer_t buf;

        ctx = arg;

        mbuffer_init(&buf, ctx->size);
        ret = lichbd_rpc_write(&buf, ctx->size, ctx->offset, ctx->flag);
        if (unlikely(ret)) {
                UNIMPLEMENTED(__DUMP__);
        }

        ctx->io++;
        ctx->offset += (ctx->size) * _random();
        ctx->offset = ctx->offset % ctx->st_size;

        mbuffer_free(&buf);
        schedule_task_new("lichbd_rpc_write", __prof_write__, ctx, -1);

        return;
}

static void __prof_write(vm_t *vm, prof_net_arg_t *ctx, size_t size)
{
        int ret;

        ctx->size = size;
        ctx->offset = 0;
        ctx->flag = 0;
        ret = vm_request(vm, __prof_write__, ctx, "lichbd_write");
        if (unlikely(ret)) {
                UNIMPLEMENTED(__DUMP__);
        }

        return;
}

static void __prof_read__(void *arg)
{
        int ret, retry = 0;
        prof_net_arg_t *ctx;
        buffer_t buf;
        int localized = 0;

        ctx = arg;

        DBUG("read size %u offset %llu\n", (int)ctx->size, (LLU)ctx->offset);

        mbuffer_init(&buf, 0);

retry:
        ret = lichbd_rpc_read(&buf, ctx->size, ctx->offset, localized);
        if (unlikely(ret)) {
                DWARN("retry %u\n", retry);
                usleep(100);
                retry++;
                goto retry;
        }

        ctx->io++;
        ctx->offset += (ctx->size) * _random();
        ctx->offset = ctx->offset % ctx->st_size;

        mbuffer_free(&buf);
        schedule_task_new("lichbd_rpc_write", __prof_read__, ctx, -1);

        return;
}

static void __prof_read(vm_t *vm, prof_net_arg_t *ctx, size_t size)
{
        int ret, retry = 0;

retry:
        ctx->size = size;
        ctx->offset = 0;
        ret = vm_request(vm, __prof_read__, ctx, "lichbd_read");
        if (unlikely(ret)) {
                sleep(1);
                DWARN("retry %u\n", retry);
                retry++;
                goto retry;
        }

        return;
}



int prof_lichbd(int threads, int size, int runtime, char mode, const char *volumes)
{
        int ret, i, s, vol_count, retry = 0;
        prof_net_arg_t _arg[512], *arg;
        struct timeval begin;
        char path[MAX_NAME_LEN], tmp[MAX_NAME_LEN];
        char *array[512];
        lichbd_ioctx_t ioctx;
        struct stat stbuf;

        YASSERT(volumes);

        ret = lichbd_init("");
        if (unlikely(ret))
                GOTO(err_ret, ret);

        strcpy(tmp, volumes);
        _str_split(tmp, ',', array, &vol_count);
        YASSERT(threads * vol_count <= 512);

        for (s = 0; s < vol_count; s++) {
                snprintf(path, MAX_NAME_LEN, "%s", array[s]);

                printf("connect to %s\n", path);
        retry:
                ret = lichbd_connect("default", path, &ioctx, O_CREAT);
                if (unlikely(ret)) {
                        USLEEP_RETRY(err_ret, ret, retry, retry, 10, (100 * 1000));
                }

#if 0
                ret = lichbd_truncate(&ioctx.fileid, 1024 * 1024 * 1024);
                if (unlikely(ret)) {
                        if (ret == EPERM) {
                        } else
                                GOTO(err_ret, ret);
                }
#endif

                ret = stor_getattr("default", &ioctx.fileid, &stbuf);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                for (i = 0; i < threads; i++) {
                        arg = &_arg[s * threads + i];
                        arg->io = 0;
                        arg->offset = 0;
                        arg->mode  = mode;
                        arg->size = size;
                        arg->st_size = stbuf.st_size;
                        arg->thread = i;
                        arg->session = s;
                        arg->ioctx = ioctx;

                        ret = ymalloc((void **)&arg->buf, size);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);

                        if (arg->mode == 'r') {
                                __prof_read(ioctx.vm, arg, arg->size);
                        } else if (arg->mode == 'w') {
                                __prof_write(ioctx.vm, arg, arg->size);
                        } else
                                UNIMPLEMENTED(__DUMP__);
                }
        }

        _gettimeofday(&begin, NULL);

        i = 0;
        while (i < runtime) {
                sleep(1);
                i++;
                __prof_dio_cli_print(_arg, threads * vol_count, &begin);
        }

        return 0;
err_ret:
        return ret;
}
